package org.example.apitest.processfunction;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.example.apitest.beans.SensorReading;

public class ProcessTest1KeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket中读取数据
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换为SensorReading类型
        DataStream<SensorReading> dataStream = inputStream
                .map((MapFunction<String, SensorReading>) SensorReading::new);


        dataStream.keyBy("id")
                .process(new MyProcessFunction())
                .print();
        env.execute();
    }

    public static class MyProcessFunction extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
        ValueState<Long> tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("ts-timer", Long.class));
        }

        @Override
        public void processElement(SensorReading value, KeyedProcessFunction<Tuple, SensorReading, Integer>.Context ctx, Collector<Integer> out) throws Exception {
            out.collect(value.getId().length());

            // ctx
            ctx.timestamp();
            ctx.getCurrentKey();
//            ctx.output();
            ctx.timerService().currentProcessingTime();
            ctx.timerService().currentWatermark();
            ctx.timerService().registerProcessingTimeTimer(1000L);
            tsTimerState.update(ctx.timerService().currentProcessingTime()+1000L);
            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000);

            ctx.timerService().deleteEventTimeTimer((value.getTimestamp() + 10) * 1000);
//            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, SensorReading, Integer>.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            System.out.println("定时器触发  " + timestamp);
            ctx.getCurrentKey();
            ctx.timeDomain();
        }

        @Override
        public void close() throws Exception {
            super.close();
            tsTimerState.clear();
        }
    }
}
